package com.atguigu.flinksql.day13.udf;


import com.atguigu.datastream.bean.Top2Vc;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.table.functions.TableAggregateFunction;
import org.apache.flink.util.Collector;

/**
 * 求前两名水位线
 */
public class Top2VcFunction extends TableAggregateFunction<Tuple2<Double, Integer>, Top2Vc> {

    //初始化
    @Override
    public Top2Vc createAccumulator() {
        Top2Vc top2Vc = new Top2Vc();
        top2Vc.setVc1(Double.MIN_VALUE);
        top2Vc.setVc2(Double.MIN_VALUE);
        return top2Vc;
    }

    //来一条计算一条
    public void accumulate(Top2Vc acc, Double value) {
        if (value > acc.getVc1()) {
            acc.setVc2(acc.getVc1());
            acc.setVc1(value);
        } else if (value > acc.getVc2()) {
            acc.setVc2(value);
        }
    }

    //输出结果
    public void emitValue(Top2Vc acc, Collector<Tuple2<Double, Integer>> out) {
        // emit the value and rank
        if (acc.getVc1() != Double.MIN_VALUE) {
            out.collect(Tuple2.of(acc.getVc1(), 1));
        }
        if (acc.getVc2() != Double.MIN_VALUE) {
            out.collect(Tuple2.of(acc.getVc2(), 2));
        }
    }
}